{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Time Series Forecasting\n",
    "\n",
    "This example shows using [Prophet](https://facebook.github.io/prophet/) and Dask for scalable time series forecasting.\n",
    "\n",
    "> Prophet is a procedure for forecasting time series data based on an additive model where non-linear trends are fit with yearly, weekly, and daily seasonality, plus holiday effects.\n",
    "\n",
    "As discussed in the [*Forecasting at scale*](https://peerj.com/preprints/3190/), large datasets aren't the only type of scaling challenge teams run into. In this example we'll focus on the third type of scaling challenege indentified in that paper:\n",
    "\n",
    "> [I]n most realistic settings, a large number of forecasts will be created, necessitating efficient, automated means of evaluating and comparing them, as well as detecting when they are likely to be performing poorly. When hundreds or even thousands of forecasts are made, it becomes important to let machines do the hard work of model evaluation and comparison while efficiently using human feedback to fix performance problems.\n",
    "\n",
    "That sounds like a perfect opportunity for Dask. We'll use Prophet and Dask together to parallize the *diagnostics* stage of research. It does not attempt to parallize the training of the model itself."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# This example currently relies on Prophet master\n",
    "import subprocess\n",
    "\n",
    "subprocess.call([\n",
    "    \"pip\", \"install\", \"Cython cmdstanpy==0.9.5 pystan numpy pandas matplotlib LunarCalendar convertdate holidays setuptools-git python-dateutil tqdm\"\n",
    "])\n",
    "\n",
    "subprocess.call([\n",
    "    \"pip\",\n",
    "    \"install\",\n",
    "    \"git+https://github.com/facebook/prophet/#egg=fbprophet&subdirectory=python\"\n",
    "])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "from fbprophet import Prophet"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We'll walk through the example from the Prophet quickstart. These values represent the log daily page views for [Peyton Manning's wikipedia page](https://en.wikipedia.org/wiki/Peyton_Manning)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = pd.read_csv(\n",
    "    'https://raw.githubusercontent.com/facebook/prophet/master/examples/example_wp_log_peyton_manning.csv',\n",
    "    parse_dates=['ds']\n",
    ")\n",
    "df.head()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df.plot(x='ds', y='y');"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Fitting the model takes a handful of seconds. Dask isn't involved at all here."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "m = Prophet(daily_seasonality=False)\n",
    "m.fit(df)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "And we can make a forecast. Again, Dask isn't involved here."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "future = m.make_future_dataframe(periods=365)\n",
    "forecast = m.predict(future)\n",
    "m.plot(forecast);"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Parallel Diagnostics\n",
    "\n",
    "Prophet includes a `fbprophet.diagnostics.cross_validation` function method, which uses *simulated historical forecasts* to provide some idea of a model's quality.\n",
    "\n",
    "> This is done by selecting cutoff points in the history, and for each of them fitting the model using data only up to that cutoff point. We can then compare the forecasted values to the actual values.\n",
    "\n",
    "See https://facebook.github.io/prophet/docs/diagnostics.html for more.\n",
    "\n",
    "Internally, `cross_validation` generates a list of cutoff values to try. Prophet fits a model and computes some metrics for each of these. By default each model is fit sequentially, but the models can be trained in parallel using the `parallel=` keyword. On a single machine `parallel=\"processes\"` is a good choice. For large problems where you'd like to distribute the work on a cluster, use `parallel=\"dask\"` after you've connected to the cluster by creating a `Client`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import dask\n",
    "from distributed import Client, performance_report\n",
    "import fbprophet.diagnostics\n",
    "\n",
    "client = Client(threads_per_worker=1)\n",
    "client"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "df_cv = fbprophet.diagnostics.cross_validation(\n",
    "    m, initial=\"730 days\", period=\"180 days\", horizon=\"365 days\",\n",
    "    parallel=\"dask\"\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Be sure to watch the Dask Dashboard as that runs. The models are fit in parallel on the cluster. At the start there's bit of overhead from having to move the model and data to the workers but after that the scaling looks quite nice."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
